Fivetran で S3 を Destination に設定し作成された Iceberg テーブルを Snowflake から参照してみた
はじめに
Fivetran では、Destination に Amazon S3 を設定することが可能です。この際、宛先テーブルは AWS Glue データカタログの機能を使用した Apache Iceberg 形式のテーブルとして作成されます。
Snowflake では Iceberg テーブルとしてテーブルを作成しデータレイクに対してクエリする機能を提供しています。※執筆時点でパブリックプレビュー
またこの際 Snowflake では以下のカタログオプションをサポートしています。
- Snowflake を Iceberg カタログとして使用
- カタログ統合 を使用し外部の Iceberg カタログに接続
カタログ統合オブジェクトを使用することで、AWS Glue データカタログの機能へのアクセスも可能となります。ここではカタログ統合経由で Iceberg テーブルを Snowflake から参照してみました。
前提条件
以下の環境を使用しています。
- データソース
- RDS for SQL Server
- SQL Server Express Edition
- 2019 15.00.4365.2v.1
- インスタンスタイプ:db.t3.micro
- RDS for SQL Server
- Destination
- Amazon S3
- 同期方法
- Fivetran Teleport Sync
データソースへの Fivetran からの接続設定手順は以下の記事をご参照ください。
事前準備
検証用として SQLServer 上で以下のデータベース・テーブルを作成します。ここでは少し多めに1000件のレコードを追加しています。データ生成のコマンドは ChatGPT に作成してもらいました。
- データベースの作成
CREATE DATABASE testdb; GO
- テーブルの作成、データの追加
-- テーブルの作成 CREATE TABLE sampledata ( id int primary key, randomnumber int, randomdate date, randomstring varchar(100) ); -- ランダムなデータの挿入 ;WITH cte as ( SELECT TOP (1000) row_number() over (order by (select null)) as rownum FROM sys.columns a CROSS JOIN sys.columns b ) INSERT INTO sampledata (id, randomnumber, randomdate, randomstring) SELECT rownum, abs(checksum(newid())) % 1000, -- 0から999までのランダムな数 dateadd(day, abs(checksum(newid())) % 3650, '2000-01-01'), -- 過去10年間のランダムな日付 left(convert(varchar(255), newid()), 10) -- ランダムな文字列 FROM cte;
確認
1> SELECT COUNT(*) FROM sampledata; 2> GO ----------- 1000 (1 行処理されました)
データの内容
1> SELECT TOP 10 * FROM sampledata; 2> GO id randomnumber randomdate randomstring ----------- ------------ ---------------- ---------------------------------------------------------------------------------------------------- 1 337 2006-02-18 B84E3B7A-A 2 19 2009-09-28 B911ED2D-7 3 216 2005-01-30 3C48E674-E 4 778 2006-10-19 5FE4AC7C-1 5 998 2001-02-28 D6A79E15-F 6 304 2004-03-31 5C8AEA83-9 7 447 2006-10-18 CD08016E-1 8 636 2000-11-13 0226AC41-6 9 365 2003-01-02 00B66B98-3 10 718 2004-08-01 6886EFE3-1 (10 行処理されました)
Fivetran 側の作業
上記の記事通り、データソース・Destination を設定し初期同期を行います。Destination の設定は下図の通りで Prefix は特に設定していません。
同期が完了すると下図の通り、ここでは Fivetran のコネクタ側で Destination schema prefix として指定した値とデータベーススキーマからなるプレフィックス配下に、テーブル名のパスが追加されていました。
さらにこの対象のテーブル名のパス配下はdata
,metadata
パスに分かれています。
名称通りデータファイルはs3://<バケット名>/<Destination schema prefix_スキーマ名>/<テーブル名>/data
に Parquet として生成されます。
Glue 側も確認すると Apache Iceberg 形式として Glue Data Catalog が追加されています。
データファイル自体は S3 にあるため、ストレージ統合を作成し Snowflake からクエリすることも可能です。スキーマ検出をしてみると下図のようになっていました。
--ストレージ統合 CREATE STORAGE INTEGRATION s3_int TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = 'S3' ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<IAM Role>' STORAGE_ALLOWED_LOCATIONS = ('s3://<バケット名>/'); --ストレージ統合の情報を取得 DESC INTEGRATION s3_int; --検証用DBの作成 CREATE DATABASE test; USE SCHEMA test.public; --ファイルフォーマットの作成 CREATE OR REPLACE FILE FORMAT my_parquet_format TYPE = PARQUET; --ステージの作成 CREATE OR REPLACE STAGE my_s3_stage STORAGE_INTEGRATION = s3_int URL = 's3://<バケット名>/sql_server_rds_dbo/sampledata/data' FILE_FORMAT = my_parquet_format; list @my_s3_stage; --スキーマ検出 SELECT * FROM TABLE( INFER_SCHEMA( LOCATION=>'@my_s3_stage' , FILE_FORMAT=>'my_parquet_format' ) );
Fivetran 経由での同期のため「_fivetran_deleted」と「_fivetran_synced」カラムが追加されています。
レコードの追加
どのようにデータファイルが追加・変更されるかを確認したかったので、まずはこのまま以下のコマンドでデータソース(SQLServer)側でレコードを 1000件追加してみます。これにより id の値が1001~ 2000のレコードが追加されます。
-- ランダムなデータをさらに1000レコード挿入 ;WITH cte AS ( SELECT TOP (1000) ROW_NUMBER() OVER (ORDER BY (SELECT NULL)) AS RowNum FROM sys.columns a CROSS JOIN sys.columns b ), max_existing_id AS ( SELECT COALESCE(MAX(id), 0) AS max_id FROM sampledata ) INSERT INTO sampledata (id, randomnumber, randomdate, randomstring) SELECT m.max_id + c.rownum, ABS(CHECKSUM(NEWID())) % 1000, -- 0から999までのランダムな数 DATEADD(day, ABS(CHECKSUM(NEWID())) % 3650, '2000-01-01'), -- 過去10年間のランダムな日付 LEFT(CONVERT(VARCHAR(255), NEWID()), 10) -- ランダムな文字列 FROM cte c, max_existing_id m;
確認
1> SELECT COUNT(*) FROM sampledata; 2> GO ----------- 2000 (1 行処理されました)
Fivetran による同期が完了すると S3 側でデータファイルの追加が確認できます。
Snowflake からステージ経由で各ファイルの id の最小・最大値をクエリして確認してみます。
SELECT MIN($1:id::NUMBER) AS min_id, MAX($1:id::NUMBER) AS max_id, METADATA$FILENAME AS filename FROM @my_s3_stage GROUP BY filename;
新規追加された Parquet ファイルの id 値は1001~2000 と意図する結果となっていました。
Snowflake 上で Iceberg テーブルを作成する
ここまでは、ステージのデータファイルに対して直接クエリしていたので以降の手順で Snowflake における Iceberg テーブルを作成し、S3 データレイクに対してクエリしてみます。Iceberg テーブルは以下の手順で作成します。
- Iceberg テーブル用に外部ボリュームを構成
- カタログ統合を作成
それぞれ基本的な流れはストレージ統合オブジェクトを作成する手順と同様です。
Iceberg テーブル用に外部ボリュームを構成する
はじめに外部ボリュームを作成します。ドキュメントは以下です。
Snowflake における Iceberg テーブルでは、データとメタデータファイルは外部のクラウドストレージ(Amazon S3、Google Cloud Storage、または Azure Storage)に保存されます。このうちメタデータファイルは、外部アカウントのストレージや Snowflake 管理のストレージを使用することも可能です。
データファイルに Snowflake からアクセスするためには外部ボリュームを定義します。下図は公式ドキュメントからの引用ですが、ストレージに対する記載の権限が必要です。メタデータファイルをどこで管理するかによって必要な権限が異なります。
ここでは公式ドキュメント記載のポリシー雛形をそのまま使用し以下の IAM ポリシーを定義しました。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3:::<バケット名>/sql_server_rds_dbo/*" }, { "Effect": "Allow", "Action": [ "s3:ListBucket", "s3:GetBucketLocation" ], "Resource": "arn:aws:s3:::<バケット名>", "Condition": { "StringLike": { "s3:prefix": [ "sql_server_rds_dbo/*" ] } } } ] }
あわせて IAM ロールも作成し、このポリシーをアタッチしておきます。
次に Snowflake 側で以下のコマンドを実行し、外部ボリュームを作成します。
CREATE OR REPLACE EXTERNAL VOLUME exvol STORAGE_LOCATIONS = ( ( NAME = 'my-s3-us-east-2' STORAGE_PROVIDER = 'S3' STORAGE_BASE_URL = 's3://<バケット名>/sql_server_rds_dbo/' STORAGE_AWS_ROLE_ARN = '<上記のポリシーをアタッチしたIAMロールのArn>' ENCRYPTION=(TYPE='AWS_SSE_S3') ) );
外部ボリュームはアカウントレベルのオブジェクトでデフォルトでは ACCOUNTADMIN のみで作成可能です。NAME
で指定する値はアカウント内で一意の名称である必要があります。
CREATE EXTERNAL VOLUME | Snowflake DOCUMENTATION
外部ボリュームを作成後、以下のコマンドで Snowflake アカウント側の AWS IAM ユーザーの情報を取得します。
DESC EXTERNAL VOLUME exvol;
出力のSTORAGE_AWS_IAM_USER_ARN
,STORAGE_AWS_EXTERNAL_ID
プロパティの値を使用し、上記のポリシーをアタッチした IAM ロールの信頼関係を編集します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "AWS": "<STORAGE_AWS_IAM_USER_ARN>" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID>" } } } ] }
外部ボリュームの構成手順は以上になります。
カタログ統合を作成する
続けて、ここでは AWS Glue データカタログを使用するため、カタログ統合オブジェクトを作成します。手順は以下に記載があります。
Snowflake からテーブルに関する情報にアクセスするために、Glue データカタログに対する以下の権限を持つ IAM ロールを作成します。
glue:GetTable
glue:GetTables
ドキュメントに記載がある通り以下のポリシーを作成します。accountid
を自身の id に置き換えます。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowGlueCatalogTableAccess", "Effect": "Allow", "Action": [ "glue:GetTable", "glue:GetTables" ], "Resource": [ "arn:aws:glue:*:<accountid>:table/*/*", "arn:aws:glue:*:<accountid>:catalog", "arn:aws:glue:*:<accountid>:database/<database-name>" ] } ] }
あわせて IAM ロールも作成し、このポリシーをアタッチしておきます。
次に Snowflake 側で以下のコマンドを実行しカタログ統合を作成します。
CREATE CATALOG INTEGRATION glueCatalogInt CATALOG_SOURCE=GLUE CATALOG_NAMESPACE='sql_server_rds_dbo' TABLE_FORMAT=ICEBERG GLUE_AWS_ROLE_ARN='<上記のポリシーをアタッチしたIAMロールのArn>' GLUE_CATALOG_ID='<AWSアカウントのID>' GLUE_REGION='us-east-2' ENABLED=TRUE;
カタログ統合もアカウントレベルのオブジェクトで、デフォルトでは ACCOUNTADMIN のみで作成可能です。 CREATE CATALOG INTEGRATION | Snowflake DOCUMENTATION
外部ボリュームを作成後、以下のコマンドで Snowflake アカウント側の AWS IAM ユーザーの情報を取得します。
DESCRIBE CATALOG INTEGRATION glueCatalogInt;
出力のGLUE_IAM_USER_ARN
,GLUE_AWS_EXTERNAL_ID
プロパティの値を使用し、上記のポリシーをアタッチした IAM ロールの信頼関係を編集します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "AWS": "<glue_iam_user_arn>" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "sts:ExternalId": "<glue_aws_external_id>" } } } ] }
カタログ統合の構成手順は以上です。
Icebergテーブルを作成する
さいごにこれまで作成したオブジェクトを使用して Iceberg テーブルを作成します。作成には以下のコマンドを使用しました。
CATALOG_TABLE_NAME
に Glue 上のテーブル名を指定します。
CREATE ICEBERG TABLE myGlueTable EXTERNAL_VOLUME='exvol' CATALOG='glueCatalogInt' CATALOG_TABLE_NAME='sampledata';
Icebergテーブルを作成する | Snowflake DOCUMENTATION
また、上記のドキュメントからの引用ですが、作成には下図の権限が必要です。
コマンド実行後、通常のテーブルのようにクエリすることができます。
SELECT * FROM mygluetable;
SELECT COUNT(*) FROM mygluetable;
Icebergテーブルへのクエリ権限
他のロールに Iceberg テーブルに対するクエリを許可するには、以下の権限を付与または継承させます。
- テーブルに関連付けられた外部ボリュームに対する USAGE 権限
- 本記事のように Snowflake で管理されていないカタログの場合は、カタログ統合に対する USAGE 権限
- Iceberg テーブルに対する SELECT 権限
- テーブルを作成したデータベースとスキーマへの USAGE 権限を含む
コマンドの例
--外部ボリュームに対する USAGE 権限 GRANT USAGE ON EXTERNAL VOLUME exvol TO ROLE <ロール>; --外部カタログに対する USAGE 権限 GRANT USAGE ON INTEGRATION glueCatalogInt TO ROLE <ロール>; --Icebergテーブルに対するSELECT権限 GRANT USAGE ON DATABASE <データベース> TO ROLE <ロール>; GRANT USAGE ON SCHEMA <データベース>.<スキーマ> TO ROLE <ロール>; GRANT SELECT ON ICEBERG TABLE <データベース>.<スキーマ>.<Icebergテーブル>TO ROLE <ロール>;
メタデータファイルの更新
既存レコードの更新
データソース(SQLServer)側で以下のコマンドを実行しレコードを更新してみます。
UPDATE sampledata SET randomnumber = 1, randomdate = '2024-05-21', randomstring = 'UPDATE' WHERE id = 100; GO
Fivetran による同期完了後 S3 のデータファイルを見てみると更新前は2つだった状態から下図の通り追加されていました。
Snowflake からステージ経由で各ファイルの id の最小・最大値をクエリして確認してみます。
SELECT MIN($1:id::NUMBER) AS min_id, MAX($1:id::NUMBER) AS max_id, METADATA$FILENAME AS filename FROM @my_s3_stage GROUP BY filename;
出力は下図のようになっており、既存のレコードが格納されているデータファイルは残りつつも、変更があったレコードは別ファイルとして追加され、それ以外のレコードが格納されたデータファイルが追加されています。
Icebergテーブルを通して確認する際は以下のコマンドを実行し、メタデータファイルを更新してから参照します。
ALTER ICEBERG TABLE mygluetable REFRESH; SELECT * FROM mygluetable WHERE id = 100;
下図の通り更新後のデータを確認できました。
Iceberg テーブルではなく、ストレージに対して通常のようにクエリすると、変更前のデータファイル自体は削除されていないため、同じ id の複数レコードが出力されます。
WITH temp AS ( SELECT $1:id::NUMBER AS id ,$1:randomstring::TEXT AS randomstring ,$1:randomdate::DATE AS randomdate ,$1:randomnumber::NUMBER AS randomnumber ,$1:_fivetran_deleted::BOOLEAN AS _fivetran_deleted ,$1:_fivetran_synced::TIMESTAMP_NTZ AS _fivetran_synced ,METADATA$FILENAME AS filename FROM @my_s3_stage ) SELECT * FROM temp WHERE id = 100;
Iceberg テーブルでは、カタログを通して変更前のデータファイルへのリンクが外れる形で最新のデータとしては出力されないように制御されます。
レコードの追加
さらにレコードを追加しみます。データソース(SQLServer)側で1000レコード追加し、3000件とし Fivetran で同期を行います。
Iceberg テーブル側では、以下でメタデータを更新しクエリすることで、内容が更新されます。
ALTER ICEBERG TABLE mygluetable REFRESH;
レコードの削除
データソース(SQLServer)側で以下のコマンドを実行し、レコードを削除します。
1> DELETE FROM sampledata 2> WHERE id = 100; 3> GO (1 行処理されました)
Fivetran による同期後、メタデータファイルの更新前は当然変更が反映されていません。
SELECT * FROM mygluetable WHERE id = 100;
上記のコマンドを実行し、メタデータファイルを更新後再度クエリすると、下図の通り更新後のデータにアクセスできます。(Fivetran による同期のため論理削除となっています)
データファイルを確認すると、ファイルがひとつ追加される形で更新されていました。
ファイルに対してクエリしてみると、実データファイルとしてはレコードが残っていることを確認できます。(3レコード)
WITH temp AS ( SELECT $1:id::NUMBER AS id ,$1:randomstring::TEXT AS randomstring ,$1:randomdate::DATE AS randomdate ,$1:randomnumber::NUMBER AS randomnumber ,$1:_fivetran_deleted::BOOLEAN AS _fivetran_deleted ,$1:_fivetran_synced::TIMESTAMP_NTZ AS _fivetran_synced ,METADATA$FILENAME AS filename FROM @my_s3_stage ) SELECT * FROM temp WHERE id = 100 ORDER BY 6;
タイムトラベルの使用
Iceberg テーブルはタイムトラベルに対応しています。
例えば、以下のコマンドで論理削除前のデータを取得できます。
SELECT * FROM mygluetable AT(TIMESTAMP => 'Tue, 21 May 2024 20:00:00 +0900'::timestamp_ltz) WHERE id = 100;
スキーマ移行の検証
列を追加
データベース側で以下のコマンドを実行し、テーブルに列を追加します。
ALTER TABLE sampledata ADD newcolumn VARCHAR(50); GO
SQLServer 側で確認
1> SELECT TOP 10 * FROM sampledata; 2> GO id randomnumber randomdate randomstring newcolumn ----------- ------------ ---------------- ------------------------ -------------------------------------------------- 1 337 2006-02-18 B84E3B7A-A NULL 2 19 2009-09-28 B911ED2D-7 NULL 3 216 2005-01-30 3C48E674-E NULL 4 778 2006-10-19 5FE4AC7C-1 NULL 5 998 2001-02-28 D6A79E15-F NULL 6 304 2004-03-31 5C8AEA83-9 NULL 7 447 2006-10-18 CD08016E-1 NULL 8 636 2000-11-13 0226AC41-6 NULL 9 365 2003-01-02 00B66B98-3 NULL 10 718 2004-08-01 6886EFE3-1 NULL (10 行処理されました)
Fivetran による同期後、メタデータファイルを更新し、Icebergテーブルを参照すると列が追加されてることを確認できます。
ALTER ICEBERG TABLE mygluetable REFRESH; SELECT * FROM mygluetable LIMIT 10;
列を削除
次にデータソース側で列を削除します。
--randomstring列を削除 ALTER TABLE sampledata DROP COLUMN randomstring; GO
Fivetran による同期後、Snowflake 側で同様の手順を行いテーブルを参照すると対象の列の値が NULL に置き換わっていることを確認できます。
ALTER ICEBERG TABLE mygluetable REFRESH; SELECT * FROM mygluetable LIMIT 10;
この際、すでに論理削除済みのレコードについては、この変更は反映されません。
テーブルを追加
さいごに、データソース側の同スキーマに以下のコマンドでテーブルを追加します。
CREATE TABLE customers ( customer_id INT PRIMARY KEY, name VARCHAR(100), email VARCHAR(100), phone_number VARCHAR(15), registration_date DATE ); INSERT INTO customers (customer_id, name, email, phone_number, registration_date) VALUES (1, 'John Doe', '[email protected]', '123-456-7890', '2024-01-01'), (2, 'Jane Smith', '[email protected]', '123-456-7891', '2024-01-02'), (3, 'Alice Johnson', '[email protected]', '123-456-7892', '2024-01-03'), (4, 'Chris Lee', '[email protected]', '123-456-7893', '2024-01-04'), (5, 'Patricia Brown', '[email protected]', '123-456-7894', '2024-01-05'); GO
SQLServer で確認
1> SELECT * FROM customers; 2> GO customer_id name email phone_number registration_date ----------- ---------------------------------------------------------------------------------------------------- ---------------------------------------------------------------------------------------------------- --------------- ----------------- 1 John Doe [email protected] 123-456-7890 2024-01-01 2 Jane Smith [email protected] 123-456-7891 2024-01-02 3 Alice Johnson [email protected] 123-456-7892 2024-01-03 4 Chris Lee [email protected] 123-456-7893 2024-01-04 5 Patricia Brown [email protected] 123-456-7894 2024-01-05 (5 行処理されました)
Fivetran 側でも変更が検知されます。
同期後、S3 側でもテーブル名からなる新しいパスが追加されます。データファイルやメタデータファイルは、テーブルごとのパス配下に格納されます。
Snowflake 側でも Iceberg テーブルとして作成できます。
CREATE ICEBERG TABLE myIcebergTable_customers EXTERNAL_VOLUME='exvol' CATALOG='glueCatalogInt' CATALOG_TABLE_NAME='customers';
作成後、これまでと同様の手順で参照できるようになります。
SELECT * FROM myIcebergTable_customers;
さいごに
Fivetran から S3 にデータを同期し、一部ではありますが、データ更新時にデータファイル自体はどのように変更されるのかを確認しつつ、実際にデータレイクに対して Snowflake の Iceberg テーブルからクエリしてみました。Glue 側の操作も Fivetran 側で実施してくれる点は便利だなと感じました。こちらの内容が何かの参考になれば幸いです。